# Copyright 2009-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Test the collection module."""
from __future__ import annotations

import asyncio
import contextlib
import re
import sys
from codecs import utf_8_decode
from collections import defaultdict
from test.asynchronous.utils import async_get_pool, async_is_mongos
from typing import Any, Iterable, no_type_check

from pymongo.asynchronous.database import AsyncDatabase

sys.path[0:0] = [""]

from test import unittest
from test.asynchronous import (  # TODO: fix sync imports in PYTHON-4528
    AsyncIntegrationTest,
    AsyncUnitTest,
    async_client_context,
)
from test.utils_shared import (
    IMPOSSIBLE_WRITE_CONCERN,
    EventListener,
    OvertCommandListener,
    async_wait_until,
)
from test.version import Version

from bson import encode
from bson.codec_options import CodecOptions
from bson.objectid import ObjectId
from bson.raw_bson import RawBSONDocument
from bson.regex import Regex
from bson.son import SON
from pymongo import ASCENDING, DESCENDING, GEO2D, GEOSPHERE, HASHED, TEXT
from pymongo.asynchronous.collection import AsyncCollection, ReturnDocument
from pymongo.asynchronous.command_cursor import AsyncCommandCursor
from pymongo.asynchronous.mongo_client import AsyncMongoClient
from pymongo.bulk_shared import BulkWriteError
from pymongo.cursor_shared import CursorType
from pymongo.errors import (
    ConfigurationError,
    DocumentTooLarge,
    DuplicateKeyError,
    ExecutionTimeout,
    InvalidDocument,
    InvalidName,
    InvalidOperation,
    OperationFailure,
    WriteConcernError,
)
from pymongo.message import _COMMAND_OVERHEAD, _gen_find_command
from pymongo.operations import *
from pymongo.read_concern import DEFAULT_READ_CONCERN
from pymongo.read_preferences import ReadPreference
from pymongo.results import (
    DeleteResult,
    InsertManyResult,
    InsertOneResult,
    UpdateResult,
)
from pymongo.write_concern import WriteConcern

_IS_SYNC = False


class TestCollectionNoConnect(AsyncUnitTest):
    """Test Collection features on a client that does not connect."""

    db: AsyncDatabase
    client: AsyncMongoClient

    async def asyncSetUp(self) -> None:
        await super().asyncSetUp()
        self.client = self.simple_client(connect=False)
        self.db = self.client.pymongo_test

    def test_collection(self):
        self.assertRaises(TypeError, AsyncCollection, self.db, 5)

        def make_col(base, name):
            return base[name]

        self.assertRaises(InvalidName, make_col, self.db, "")
        self.assertRaises(InvalidName, make_col, self.db, "te$t")
        self.assertRaises(InvalidName, make_col, self.db, ".test")
        self.assertRaises(InvalidName, make_col, self.db, "test.")
        self.assertRaises(InvalidName, make_col, self.db, "tes..t")
        self.assertRaises(InvalidName, make_col, self.db.test, "")
        self.assertRaises(InvalidName, make_col, self.db.test, "te$t")
        self.assertRaises(InvalidName, make_col, self.db.test, ".test")
        self.assertRaises(InvalidName, make_col, self.db.test, "test.")
        self.assertRaises(InvalidName, make_col, self.db.test, "tes..t")
        self.assertRaises(InvalidName, make_col, self.db.test, "tes\x00t")

    def test_getattr(self):
        coll = self.db.test
        self.assertIsInstance(coll["_does_not_exist"], AsyncCollection)

        with self.assertRaises(AttributeError) as context:
            coll._does_not_exist

        # Message should be:
        # "AttributeError: Collection has no attribute '_does_not_exist'. To
        # access the test._does_not_exist collection, use
        # database['test._does_not_exist']."
        self.assertIn("has no attribute '_does_not_exist'", str(context.exception))

        coll2 = coll.with_options(write_concern=WriteConcern(w=0))
        self.assertEqual(coll2.write_concern, WriteConcern(w=0))
        self.assertNotEqual(coll.write_concern, coll2.write_concern)
        coll3 = coll2.subcoll
        self.assertEqual(coll2.write_concern, coll3.write_concern)
        coll4 = coll2["subcoll"]
        self.assertEqual(coll2.write_concern, coll4.write_concern)

    def test_iteration(self):
        coll = self.db.coll
        msg = "'AsyncCollection' object is not iterable"
        # Iteration fails
        with self.assertRaisesRegex(TypeError, msg):
            for _ in coll:  # type: ignore[misc] # error: "None" not callable  [misc]
                break
        # Non-string indices will start failing in PyMongo 5.
        self.assertEqual(coll[0].name, "coll.0")
        self.assertEqual(coll[{}].name, "coll.{}")
        # next fails
        with self.assertRaisesRegex(TypeError, msg):
            _ = next(coll)
        # .next() fails
        with self.assertRaisesRegex(TypeError, msg):
            _ = coll.next()
        # Do not implement typing.Iterable.
        self.assertNotIsInstance(coll, Iterable)


class AsyncTestCollection(AsyncIntegrationTest):
    w: int

    async def asyncSetUp(self):
        await super().asyncSetUp()
        self.w = async_client_context.w  # type: ignore

    async def asyncTearDown(self):
        await self.db.test.drop()
        await self.db.drop_collection("test_large_limit")
        await super().asyncTearDown()

    @contextlib.contextmanager
    def write_concern_collection(self):
        if async_client_context.is_rs:
            with self.assertRaises(WriteConcernError):
                # Unsatisfiable write concern.
                yield AsyncCollection(
                    self.db,
                    "test",
                    write_concern=WriteConcern(w=len(async_client_context.nodes) + 1),
                )
        else:
            yield self.db.test

    async def test_equality(self):
        self.assertIsInstance(self.db.test, AsyncCollection)
        self.assertEqual(self.db.test, self.db["test"])
        self.assertEqual(self.db.test, AsyncCollection(self.db, "test"))
        self.assertEqual(self.db.test.mike, self.db["test.mike"])
        self.assertEqual(self.db.test["mike"], self.db["test.mike"])

    async def test_hashable(self):
        self.assertIn(self.db.test.mike, {self.db["test.mike"]})

    async def test_create(self):
        # No Exception.
        db = async_client_context.client.pymongo_test
        await db.create_test_no_wc.drop()

        async def lambda_test():
            return "create_test_no_wc" not in await db.list_collection_names()

        async def lambda_test_2():
            return "create_test_no_wc" in await db.list_collection_names()

        await async_wait_until(
            lambda_test,
            "drop create_test_no_wc collection",
        )
        await db.create_collection("create_test_no_wc")
        await async_wait_until(
            lambda_test_2,
            "create create_test_no_wc collection",
        )
        # SERVER-33317
        if not async_client_context.is_mongos or not async_client_context.version.at_least(3, 7, 0):
            with self.assertRaises(OperationFailure):
                await db.create_collection("create-test-wc", write_concern=IMPOSSIBLE_WRITE_CONCERN)

    async def test_drop_nonexistent_collection(self):
        await self.db.drop_collection("test")
        self.assertNotIn("test", await self.db.list_collection_names())

        # No exception
        await self.db.drop_collection("test")

    async def test_create_indexes(self):
        db = self.db

        with self.assertRaises(TypeError):
            await db.test.create_indexes("foo")  # type: ignore[arg-type]
        with self.assertRaises(TypeError):
            await db.test.create_indexes(["foo"])  # type: ignore[list-item]
        self.assertRaises(TypeError, IndexModel, 5)
        self.assertRaises(ValueError, IndexModel, [])

        await db.test.drop_indexes()
        await db.test.insert_one({})
        self.assertEqual(len(await db.test.index_information()), 1)

        await db.test.create_indexes([IndexModel("hello")])
        await db.test.create_indexes([IndexModel([("hello", DESCENDING), ("world", ASCENDING)])])

        # Tuple instead of list.
        await db.test.create_indexes([IndexModel((("world", ASCENDING),))])

        self.assertEqual(len(await db.test.index_information()), 4)

        await db.test.drop_indexes()
        names = await db.test.create_indexes(
            [IndexModel([("hello", DESCENDING), ("world", ASCENDING)], name="hello_world")]
        )
        self.assertEqual(names, ["hello_world"])

        await db.test.drop_indexes()
        self.assertEqual(len(await db.test.index_information()), 1)
        await db.test.create_indexes([IndexModel("hello")])
        self.assertIn("hello_1", await db.test.index_information())

        await db.test.drop_indexes()
        self.assertEqual(len(await db.test.index_information()), 1)
        names = await db.test.create_indexes(
            [IndexModel([("hello", DESCENDING), ("world", ASCENDING)]), IndexModel("hello")]
        )
        info = await db.test.index_information()
        for name in names:
            self.assertIn(name, info)

        await db.test.drop()
        await db.test.insert_one({"a": 1})
        await db.test.insert_one({"a": 1})
        with self.assertRaises(DuplicateKeyError):
            await db.test.create_indexes([IndexModel("a", unique=True)])

        with self.write_concern_collection() as coll:
            await coll.create_indexes([IndexModel("hello")])

    @async_client_context.require_version_max(4, 3, -1)
    async def test_create_indexes_commitQuorum_requires_44(self):
        db = self.db
        with self.assertRaisesRegex(
            ConfigurationError,
            r"Must be connected to MongoDB 4\.4\+ to use the commitQuorum option for createIndexes",
        ):
            await db.coll.create_indexes([IndexModel("a")], commitQuorum="majority")

    @async_client_context.require_no_standalone
    @async_client_context.require_version_min(4, 4, -1)
    async def test_create_indexes_commitQuorum(self):
        await self.db.coll.create_indexes([IndexModel("a")], commitQuorum="majority")

    async def test_create_index(self):
        db = self.db

        with self.assertRaises(TypeError):
            await db.test.create_index(5)  # type: ignore[arg-type]
        with self.assertRaises(ValueError):
            await db.test.create_index([])

        await db.test.drop_indexes()
        await db.test.insert_one({})
        self.assertEqual(len(await db.test.index_information()), 1)

        await db.test.create_index("hello")
        await db.test.create_index([("hello", DESCENDING), ("world", ASCENDING)])

        # Tuple instead of list.
        await db.test.create_index((("world", ASCENDING),))

        self.assertEqual(len(await db.test.index_information()), 4)

        await db.test.drop_indexes()
        ix = await db.test.create_index(
            [("hello", DESCENDING), ("world", ASCENDING)], name="hello_world"
        )
        self.assertEqual(ix, "hello_world")

        await db.test.drop_indexes()
        self.assertEqual(len(await db.test.index_information()), 1)
        await db.test.create_index("hello")
        self.assertIn("hello_1", await db.test.index_information())

        await db.test.drop_indexes()
        self.assertEqual(len(await db.test.index_information()), 1)
        await db.test.create_index([("hello", DESCENDING), ("world", ASCENDING)])
        self.assertIn("hello_-1_world_1", await db.test.index_information())

        await db.test.drop_indexes()
        await db.test.create_index([("hello", DESCENDING), ("world", ASCENDING)], name=None)
        self.assertIn("hello_-1_world_1", await db.test.index_information())

        await db.test.drop()
        await db.test.insert_one({"a": 1})
        await db.test.insert_one({"a": 1})
        with self.assertRaises(DuplicateKeyError):
            await db.test.create_index("a", unique=True)

        with self.write_concern_collection() as coll:
            await coll.create_index([("hello", DESCENDING)])

        await db.test.create_index(["hello", "world"])
        await db.test.create_index(["hello", ("world", DESCENDING)])
        await db.test.create_index({"hello": 1}.items())  # type:ignore[arg-type]

    async def test_drop_index(self):
        db = self.db
        await db.test.drop_indexes()
        await db.test.create_index("hello")
        name = await db.test.create_index("goodbye")

        self.assertEqual(len(await db.test.index_information()), 3)
        self.assertEqual(name, "goodbye_1")
        await db.test.drop_index(name)

        # Drop it again.
        if async_client_context.version < Version(8, 3, -1):
            with self.assertRaises(OperationFailure):
                await db.test.drop_index(name)
        else:
            await db.test.drop_index(name)
        self.assertEqual(len(await db.test.index_information()), 2)
        self.assertIn("hello_1", await db.test.index_information())

        await db.test.drop_indexes()
        await db.test.create_index("hello")
        name = await db.test.create_index("goodbye")

        self.assertEqual(len(await db.test.index_information()), 3)
        self.assertEqual(name, "goodbye_1")
        await db.test.drop_index([("goodbye", ASCENDING)])
        self.assertEqual(len(await db.test.index_information()), 2)
        self.assertIn("hello_1", await db.test.index_information())

        with self.write_concern_collection() as coll:
            await coll.drop_index("hello_1")

    @async_client_context.require_no_mongos
    @async_client_context.require_test_commands
    async def test_index_management_max_time_ms(self):
        coll = self.db.test
        await self.client.admin.command(
            "configureFailPoint", "maxTimeAlwaysTimeOut", mode="alwaysOn"
        )
        try:
            with self.assertRaises(ExecutionTimeout):
                await coll.create_index("foo", maxTimeMS=1)
            with self.assertRaises(ExecutionTimeout):
                await coll.create_indexes([IndexModel("foo")], maxTimeMS=1)
            with self.assertRaises(ExecutionTimeout):
                await coll.drop_index("foo", maxTimeMS=1)
            with self.assertRaises(ExecutionTimeout):
                await coll.drop_indexes(maxTimeMS=1)
        finally:
            await self.client.admin.command(
                "configureFailPoint", "maxTimeAlwaysTimeOut", mode="off"
            )

    async def test_list_indexes(self):
        db = self.db
        await db.test.drop()
        await db.test.insert_one({})  # create collection

        def map_indexes(indexes):
            return {index["name"]: index for index in indexes}

        indexes = await (await db.test.list_indexes()).to_list()
        self.assertEqual(len(indexes), 1)
        self.assertIn("_id_", map_indexes(indexes))

        await db.test.create_index("hello")
        indexes = await (await db.test.list_indexes()).to_list()
        self.assertEqual(len(indexes), 2)
        self.assertEqual(map_indexes(indexes)["hello_1"]["key"], SON([("hello", ASCENDING)]))

        await db.test.create_index([("hello", DESCENDING), ("world", ASCENDING)], unique=True)
        indexes = await (await db.test.list_indexes()).to_list()
        self.assertEqual(len(indexes), 3)
        index_map = map_indexes(indexes)
        self.assertEqual(
            index_map["hello_-1_world_1"]["key"], SON([("hello", DESCENDING), ("world", ASCENDING)])
        )
        self.assertEqual(True, index_map["hello_-1_world_1"]["unique"])

        # List indexes on a collection that does not exist.
        indexes = await (await db.does_not_exist.list_indexes()).to_list()
        self.assertEqual(len(indexes), 0)

        # List indexes on a database that does not exist.
        indexes = await (await db.does_not_exist.list_indexes()).to_list()
        self.assertEqual(len(indexes), 0)

    async def test_index_info(self):
        db = self.db
        await db.test.drop()
        await db.test.insert_one({})  # create collection
        self.assertEqual(len(await db.test.index_information()), 1)
        self.assertIn("_id_", await db.test.index_information())

        await db.test.create_index("hello")
        self.assertEqual(len(await db.test.index_information()), 2)
        self.assertEqual(
            (await db.test.index_information())["hello_1"]["key"], [("hello", ASCENDING)]
        )

        await db.test.create_index([("hello", DESCENDING), ("world", ASCENDING)], unique=True)
        self.assertEqual(
            (await db.test.index_information())["hello_1"]["key"], [("hello", ASCENDING)]
        )
        self.assertEqual(len(await db.test.index_information()), 3)
        self.assertEqual(
            [("hello", DESCENDING), ("world", ASCENDING)],
            (await db.test.index_information())["hello_-1_world_1"]["key"],
        )
        self.assertEqual(True, (await db.test.index_information())["hello_-1_world_1"]["unique"])

    async def test_index_geo2d(self):
        db = self.db
        await db.test.drop_indexes()
        self.assertEqual("loc_2d", await db.test.create_index([("loc", GEO2D)]))
        index_info = (await db.test.index_information())["loc_2d"]
        self.assertEqual([("loc", "2d")], index_info["key"])

    # geoSearch was deprecated in 4.4 and removed in 5.0
    @async_client_context.require_version_max(4, 5)
    @async_client_context.require_no_mongos
    async def test_index_haystack(self):
        db = self.db
        await db.test.drop()
        _id = (
            await db.test.insert_one({"pos": {"long": 34.2, "lat": 33.3}, "type": "restaurant"})
        ).inserted_id
        await db.test.insert_one({"pos": {"long": 34.2, "lat": 37.3}, "type": "restaurant"})
        await db.test.insert_one({"pos": {"long": 59.1, "lat": 87.2}, "type": "office"})
        await db.test.create_index([("pos", "geoHaystack"), ("type", ASCENDING)], bucketSize=1)

        results = (
            await db.command(
                SON(
                    [
                        ("geoSearch", "test"),
                        ("near", [33, 33]),
                        ("maxDistance", 6),
                        ("search", {"type": "restaurant"}),
                        ("limit", 30),
                    ]
                )
            )
        )["results"]

        self.assertEqual(2, len(results))
        self.assertEqual(
            {"_id": _id, "pos": {"long": 34.2, "lat": 33.3}, "type": "restaurant"}, results[0]
        )

    @async_client_context.require_no_mongos
    async def test_index_text(self):
        db = self.db
        await db.test.drop_indexes()
        self.assertEqual("t_text", await db.test.create_index([("t", TEXT)]))
        index_info = (await db.test.index_information())["t_text"]
        self.assertIn("weights", index_info)

        await db.test.insert_many(
            [{"t": "spam eggs and spam"}, {"t": "spam"}, {"t": "egg sausage and bacon"}]
        )

        # MongoDB 2.6 text search. Create 'score' field in projection.
        cursor = db.test.find({"$text": {"$search": "spam"}}, {"score": {"$meta": "textScore"}})

        # Sort by 'score' field.
        cursor.sort([("score", {"$meta": "textScore"})])
        results = await cursor.to_list()
        self.assertGreaterEqual(results[0]["score"], results[1]["score"])

        await db.test.drop_indexes()

    async def test_index_2dsphere(self):
        db = self.db
        await db.test.drop_indexes()
        self.assertEqual("geo_2dsphere", await db.test.create_index([("geo", GEOSPHERE)]))

        for dummy, info in (await db.test.index_information()).items():
            field, idx_type = info["key"][0]
            if field == "geo" and idx_type == "2dsphere":
                break
        else:
            self.fail("2dsphere index not found.")

        poly = {"type": "Polygon", "coordinates": [[[40, 5], [40, 6], [41, 6], [41, 5], [40, 5]]]}
        query = {"geo": {"$within": {"$geometry": poly}}}

        # This query will error without a 2dsphere index.
        db.test.find(query)
        await db.test.drop_indexes()

    async def test_index_hashed(self):
        db = self.db
        await db.test.drop_indexes()
        self.assertEqual("a_hashed", await db.test.create_index([("a", HASHED)]))

        for dummy, info in (await db.test.index_information()).items():
            field, idx_type = info["key"][0]
            if field == "a" and idx_type == "hashed":
                break
        else:
            self.fail("hashed index not found.")

        await db.test.drop_indexes()

    async def test_index_sparse(self):
        db = self.db
        await db.test.drop_indexes()
        await db.test.create_index([("key", ASCENDING)], sparse=True)
        self.assertTrue((await db.test.index_information())["key_1"]["sparse"])

    async def test_index_background(self):
        db = self.db
        await db.test.drop_indexes()
        await db.test.create_index([("keya", ASCENDING)])
        await db.test.create_index([("keyb", ASCENDING)], background=False)
        await db.test.create_index([("keyc", ASCENDING)], background=True)
        self.assertNotIn("background", (await db.test.index_information())["keya_1"])
        self.assertFalse((await db.test.index_information())["keyb_1"]["background"])
        self.assertTrue((await db.test.index_information())["keyc_1"]["background"])

    async def _drop_dups_setup(self, db):
        await db.drop_collection("test")
        await db.test.insert_one({"i": 1})
        await db.test.insert_one({"i": 2})
        await db.test.insert_one({"i": 2})  # duplicate
        await db.test.insert_one({"i": 3})

    async def test_index_dont_drop_dups(self):
        # Try *not* dropping duplicates
        db = self.db
        await self._drop_dups_setup(db)

        # There's a duplicate
        async def _test_create():
            await db.test.create_index([("i", ASCENDING)], unique=True, dropDups=False)

        with self.assertRaises(DuplicateKeyError):
            await _test_create()

        # Duplicate wasn't dropped
        self.assertEqual(4, await db.test.count_documents({}))

        # Index wasn't created, only the default index on _id
        self.assertEqual(1, len(await db.test.index_information()))

    # Get the plan dynamically because the explain format will change.
    def get_plan_stage(self, root, stage):
        if root.get("stage") == stage:
            return root
        elif "inputStage" in root:
            return self.get_plan_stage(root["inputStage"], stage)
        elif "inputStages" in root:
            for i in root["inputStages"]:
                stage = self.get_plan_stage(i, stage)
                if stage:
                    return stage
        elif "queryPlan" in root:
            # queryPlan (and slotBasedPlan) are new in 5.0.
            return self.get_plan_stage(root["queryPlan"], stage)
        elif "shards" in root:
            for i in root["shards"]:
                stage = self.get_plan_stage(i["winningPlan"], stage)
                if stage:
                    return stage
        return {}

    async def test_index_filter(self):
        db = self.db
        await db.drop_collection("test")

        # Test bad filter spec on create.
        with self.assertRaises(OperationFailure):
            await db.test.create_index("x", partialFilterExpression=5)
        with self.assertRaises(OperationFailure):
            await db.test.create_index("x", partialFilterExpression={"x": {"$asdasd": 3}})
        with self.assertRaises(OperationFailure):
            await db.test.create_index("x", partialFilterExpression={"$and": 5})

        self.assertEqual(
            "x_1",
            await db.test.create_index(
                [("x", ASCENDING)], partialFilterExpression={"a": {"$lte": 1.5}}
            ),
        )
        await db.test.insert_one({"x": 5, "a": 2})
        await db.test.insert_one({"x": 6, "a": 1})

        # Operations that use the partial index.
        explain = await db.test.find({"x": 6, "a": 1}).explain()
        stage = self.get_plan_stage(explain["queryPlanner"]["winningPlan"], "IXSCAN")
        self.assertEqual("x_1", stage.get("indexName"))
        self.assertTrue(stage.get("isPartial"))

        explain = await db.test.find({"x": {"$gt": 1}, "a": 1}).explain()
        stage = self.get_plan_stage(explain["queryPlanner"]["winningPlan"], "IXSCAN")
        self.assertEqual("x_1", stage.get("indexName"))
        self.assertTrue(stage.get("isPartial"))

        explain = await db.test.find({"x": 6, "a": {"$lte": 1}}).explain()
        stage = self.get_plan_stage(explain["queryPlanner"]["winningPlan"], "IXSCAN")
        self.assertEqual("x_1", stage.get("indexName"))
        self.assertTrue(stage.get("isPartial"))

        # Operations that do not use the partial index.
        explain = await db.test.find({"x": 6, "a": {"$lte": 1.6}}).explain()
        stage = self.get_plan_stage(explain["queryPlanner"]["winningPlan"], "COLLSCAN")
        self.assertNotEqual({}, stage)
        explain = await db.test.find({"x": 6}).explain()
        stage = self.get_plan_stage(explain["queryPlanner"]["winningPlan"], "COLLSCAN")
        self.assertNotEqual({}, stage)

        # Test drop_indexes.
        await db.test.drop_index("x_1")
        explain = await db.test.find({"x": 6, "a": 1}).explain()
        stage = self.get_plan_stage(explain["queryPlanner"]["winningPlan"], "COLLSCAN")
        self.assertNotEqual({}, stage)

    async def test_field_selection(self):
        db = self.db
        await db.drop_collection("test")

        doc = {"a": 1, "b": 5, "c": {"d": 5, "e": 10}}
        await db.test.insert_one(doc)

        # Test field inclusion
        doc = await anext(db.test.find({}, ["_id"]))
        self.assertEqual(list(doc), ["_id"])
        doc = await anext(db.test.find({}, ["a"]))
        l = list(doc)
        l.sort()
        self.assertEqual(l, ["_id", "a"])
        doc = await anext(db.test.find({}, ["b"]))
        l = list(doc)
        l.sort()
        self.assertEqual(l, ["_id", "b"])
        doc = await anext(db.test.find({}, ["c"]))
        l = list(doc)
        l.sort()
        self.assertEqual(l, ["_id", "c"])
        doc = await anext(db.test.find({}, ["a"]))
        self.assertEqual(doc["a"], 1)
        doc = await anext(db.test.find({}, ["b"]))
        self.assertEqual(doc["b"], 5)
        doc = await anext(db.test.find({}, ["c"]))
        self.assertEqual(doc["c"], {"d": 5, "e": 10})

        # Test inclusion of fields with dots
        doc = await anext(db.test.find({}, ["c.d"]))
        self.assertEqual(doc["c"], {"d": 5})
        doc = await anext(db.test.find({}, ["c.e"]))
        self.assertEqual(doc["c"], {"e": 10})
        doc = await anext(db.test.find({}, ["b", "c.e"]))
        self.assertEqual(doc["c"], {"e": 10})

        doc = await anext(db.test.find({}, ["b", "c.e"]))
        l = list(doc)
        l.sort()
        self.assertEqual(l, ["_id", "b", "c"])
        doc = await anext(db.test.find({}, ["b", "c.e"]))
        self.assertEqual(doc["b"], 5)

        # Test field exclusion
        doc = await anext(db.test.find({}, {"a": False, "b": 0}))
        l = list(doc)
        l.sort()
        self.assertEqual(l, ["_id", "c"])

        doc = await anext(db.test.find({}, {"_id": False}))
        l = list(doc)
        self.assertNotIn("_id", l)

    async def test_options(self):
        db = self.db
        await db.drop_collection("test")
        await db.create_collection("test", capped=True, size=4096)
        result = await db.test.options()
        self.assertEqual(result, {"capped": True, "size": 4096})
        await db.drop_collection("test")

    async def test_insert_one(self):
        db = self.db
        await db.test.drop()

        document: dict[str, Any] = {"_id": 1000}
        result = await db.test.insert_one(document)
        self.assertIsInstance(result, InsertOneResult)
        self.assertIsInstance(result.inserted_id, int)
        self.assertEqual(document["_id"], result.inserted_id)
        self.assertTrue(result.acknowledged)
        self.assertIsNotNone(await db.test.find_one({"_id": document["_id"]}))
        self.assertEqual(1, await db.test.count_documents({}))

        document = {"foo": "bar"}
        result = await db.test.insert_one(document)
        self.assertIsInstance(result, InsertOneResult)
        self.assertIsInstance(result.inserted_id, ObjectId)
        self.assertEqual(document["_id"], result.inserted_id)
        self.assertTrue(result.acknowledged)
        self.assertIsNotNone(await db.test.find_one({"_id": document["_id"]}))
        self.assertEqual(2, await db.test.count_documents({}))

        db = db.client.get_database(db.name, write_concern=WriteConcern(w=0))
        result = await db.test.insert_one(document)
        self.assertIsInstance(result, InsertOneResult)
        self.assertIsInstance(result.inserted_id, ObjectId)
        self.assertEqual(document["_id"], result.inserted_id)
        self.assertFalse(result.acknowledged)
        # The insert failed duplicate key...

        async def async_lambda():
            return await db.test.count_documents({}) == 2

        await async_wait_until(async_lambda, "forcing duplicate key error")

        document = RawBSONDocument(encode({"_id": ObjectId(), "foo": "bar"}))
        result = await db.test.insert_one(document)
        self.assertIsInstance(result, InsertOneResult)
        self.assertEqual(result.inserted_id, None)

    async def test_insert_many(self):
        db = self.db
        await db.test.drop()

        docs: list = [{} for _ in range(5)]
        result = await db.test.insert_many(docs)
        self.assertIsInstance(result, InsertManyResult)
        self.assertIsInstance(result.inserted_ids, list)
        self.assertEqual(5, len(result.inserted_ids))
        for doc in docs:
            _id = doc["_id"]
            self.assertIsInstance(_id, ObjectId)
            self.assertIn(_id, result.inserted_ids)
            self.assertEqual(1, await db.test.count_documents({"_id": _id}))
        self.assertTrue(result.acknowledged)

        docs = [{"_id": i} for i in range(5)]
        result = await db.test.insert_many(docs)
        self.assertIsInstance(result, InsertManyResult)
        self.assertIsInstance(result.inserted_ids, list)
        self.assertEqual(5, len(result.inserted_ids))
        for doc in docs:
            _id = doc["_id"]
            self.assertIsInstance(_id, int)
            self.assertIn(_id, result.inserted_ids)
            self.assertEqual(1, await db.test.count_documents({"_id": _id}))
        self.assertTrue(result.acknowledged)

        docs = [RawBSONDocument(encode({"_id": i + 5})) for i in range(5)]
        result = await db.test.insert_many(docs)
        self.assertIsInstance(result, InsertManyResult)
        self.assertIsInstance(result.inserted_ids, list)
        self.assertEqual([], result.inserted_ids)

        db = db.client.get_database(db.name, write_concern=WriteConcern(w=0))
        docs: list = [{} for _ in range(5)]
        result = await db.test.insert_many(docs)
        self.assertIsInstance(result, InsertManyResult)
        self.assertFalse(result.acknowledged)
        self.assertEqual(20, await db.test.count_documents({}))

    async def test_insert_many_generator(self):
        coll = self.db.test
        await coll.delete_many({})

        def gen():
            yield {"a": 1, "b": 1}
            yield {"a": 1, "b": 2}
            yield {"a": 2, "b": 3}
            yield {"a": 3, "b": 5}
            yield {"a": 5, "b": 8}

        result = await coll.insert_many(gen())
        self.assertEqual(5, len(result.inserted_ids))

    async def test_insert_many_invalid(self):
        db = self.db

        with self.assertRaisesRegex(TypeError, "documents must be a non-empty list"):
            await db.test.insert_many({})

        with self.assertRaisesRegex(TypeError, "documents must be a non-empty list"):
            await db.test.insert_many([])

        with self.assertRaisesRegex(TypeError, "documents must be a non-empty list"):
            await db.test.insert_many(1)  # type: ignore[arg-type]

        with self.assertRaisesRegex(TypeError, "documents must be a non-empty list"):
            await db.test.insert_many(RawBSONDocument(encode({"_id": 2})))

    async def test_delete_one(self):
        await self.db.test.drop()

        await self.db.test.insert_one({"x": 1})
        await self.db.test.insert_one({"y": 1})
        await self.db.test.insert_one({"z": 1})

        result = await self.db.test.delete_one({"x": 1})
        self.assertIsInstance(result, DeleteResult)
        self.assertEqual(1, result.deleted_count)
        self.assertTrue(result.acknowledged)
        self.assertEqual(2, await self.db.test.count_documents({}))

        result = await self.db.test.delete_one({"y": 1})
        self.assertIsInstance(result, DeleteResult)
        self.assertEqual(1, result.deleted_count)
        self.assertTrue(result.acknowledged)
        self.assertEqual(1, await self.db.test.count_documents({}))

        db = self.db.client.get_database(self.db.name, write_concern=WriteConcern(w=0))
        result = await db.test.delete_one({"z": 1})
        self.assertIsInstance(result, DeleteResult)
        self.assertRaises(InvalidOperation, lambda: result.deleted_count)
        self.assertFalse(result.acknowledged)

        async def lambda_async():
            return await db.test.count_documents({}) == 0

        await async_wait_until(lambda_async, "delete 1 documents")

    async def test_delete_many(self):
        await self.db.test.drop()

        await self.db.test.insert_one({"x": 1})
        await self.db.test.insert_one({"x": 1})
        await self.db.test.insert_one({"y": 1})
        await self.db.test.insert_one({"y": 1})

        result = await self.db.test.delete_many({"x": 1})
        self.assertIsInstance(result, DeleteResult)
        self.assertEqual(2, result.deleted_count)
        self.assertTrue(result.acknowledged)
        self.assertEqual(0, await self.db.test.count_documents({"x": 1}))

        db = self.db.client.get_database(self.db.name, write_concern=WriteConcern(w=0))
        result = await db.test.delete_many({"y": 1})
        self.assertIsInstance(result, DeleteResult)
        self.assertRaises(InvalidOperation, lambda: result.deleted_count)
        self.assertFalse(result.acknowledged)

        async def lambda_async():
            return await db.test.count_documents({}) == 0

        await async_wait_until(lambda_async, "delete 2 documents")

    async def test_command_document_too_large(self):
        large = "*" * (await async_client_context.max_bson_size + _COMMAND_OVERHEAD)
        coll = self.db.test
        with self.assertRaises(DocumentTooLarge):
            await coll.insert_one({"data": large})
        # update_one and update_many are the same
        with self.assertRaises(DocumentTooLarge):
            await coll.replace_one({}, {"data": large})
        with self.assertRaises(DocumentTooLarge):
            await coll.delete_one({"data": large})

    async def test_write_large_document(self):
        max_size = await async_client_context.max_bson_size
        half_size = int(max_size / 2)
        max_str = "x" * max_size
        half_str = "x" * half_size
        self.assertEqual(max_size, 16777216)

        with self.assertRaises(OperationFailure):
            await self.db.test.insert_one({"foo": max_str})
        with self.assertRaises(OperationFailure):
            await self.db.test.replace_one({}, {"foo": max_str}, upsert=True)
        with self.assertRaises(OperationFailure):
            await self.db.test.insert_many([{"x": 1}, {"foo": max_str}])
        await self.db.test.insert_many([{"foo": half_str}, {"foo": half_str}])

        await self.db.test.insert_one({"bar": "x"})
        # Use w=0 here to test legacy doc size checking in all server versions
        unack_coll = self.db.test.with_options(write_concern=WriteConcern(w=0))
        with self.assertRaises(DocumentTooLarge):
            await unack_coll.replace_one({"bar": "x"}, {"bar": "x" * (max_size - 14)})
        await self.db.test.replace_one({"bar": "x"}, {"bar": "x" * (max_size - 32)})

    async def test_insert_bypass_document_validation(self):
        db = self.db
        await db.test.drop()
        await db.create_collection("test", validator={"a": {"$exists": True}})
        db_w0 = self.db.client.get_database(self.db.name, write_concern=WriteConcern(w=0))

        # Test insert_one
        with self.assertRaises(OperationFailure):
            await db.test.insert_one({"_id": 1, "x": 100})
        result = await db.test.insert_one({"_id": 1, "x": 100}, bypass_document_validation=True)
        self.assertIsInstance(result, InsertOneResult)
        self.assertEqual(1, result.inserted_id)
        result = await db.test.insert_one({"_id": 2, "a": 0})
        self.assertIsInstance(result, InsertOneResult)
        self.assertEqual(2, result.inserted_id)

        await db_w0.test.insert_one({"y": 1}, bypass_document_validation=True)

        async def async_lambda():
            return await db_w0.test.find_one({"y": 1})

        await async_wait_until(async_lambda, "find w:0 inserted document")

        # Test insert_many
        docs = [{"_id": i, "x": 100 - i} for i in range(3, 100)]
        with self.assertRaises(OperationFailure):
            await db.test.insert_many(docs)
        result = await db.test.insert_many(docs, bypass_document_validation=True)
        self.assertIsInstance(result, InsertManyResult)
        self.assertTrue(97, len(result.inserted_ids))
        for doc in docs:
            _id = doc["_id"]
            self.assertIsInstance(_id, int)
            self.assertIn(_id, result.inserted_ids)
            self.assertEqual(1, await db.test.count_documents({"x": doc["x"]}))
        self.assertTrue(result.acknowledged)
        docs = [{"_id": i, "a": 200 - i} for i in range(100, 200)]
        result = await db.test.insert_many(docs)
        self.assertIsInstance(result, InsertManyResult)
        self.assertTrue(97, len(result.inserted_ids))
        for doc in docs:
            _id = doc["_id"]
            self.assertIsInstance(_id, int)
            self.assertIn(_id, result.inserted_ids)
            self.assertEqual(1, await db.test.count_documents({"a": doc["a"]}))
        self.assertTrue(result.acknowledged)

        with self.assertRaises(OperationFailure):
            await db_w0.test.insert_many(
                [{"x": 1}, {"x": 2}],
                bypass_document_validation=True,
            )

    async def test_replace_bypass_document_validation(self):
        db = self.db
        await db.test.drop()
        await db.create_collection("test", validator={"a": {"$exists": True}})
        db_w0 = self.db.client.get_database(self.db.name, write_concern=WriteConcern(w=0))

        # Test replace_one
        await db.test.insert_one({"a": 101})
        with self.assertRaises(OperationFailure):
            await db.test.replace_one({"a": 101}, {"y": 1})
        self.assertEqual(0, await db.test.count_documents({"y": 1}))
        self.assertEqual(1, await db.test.count_documents({"a": 101}))
        await db.test.replace_one({"a": 101}, {"y": 1}, bypass_document_validation=True)
        self.assertEqual(0, await db.test.count_documents({"a": 101}))
        self.assertEqual(1, await db.test.count_documents({"y": 1}))
        await db.test.replace_one({"y": 1}, {"a": 102})
        self.assertEqual(0, await db.test.count_documents({"y": 1}))
        self.assertEqual(0, await db.test.count_documents({"a": 101}))
        self.assertEqual(1, await db.test.count_documents({"a": 102}))

        await db.test.insert_one({"y": 1}, bypass_document_validation=True)
        with self.assertRaises(OperationFailure):
            await db.test.replace_one({"y": 1}, {"x": 101})
        self.assertEqual(0, await db.test.count_documents({"x": 101}))
        self.assertEqual(1, await db.test.count_documents({"y": 1}))
        await db.test.replace_one({"y": 1}, {"x": 101}, bypass_document_validation=True)
        self.assertEqual(0, await db.test.count_documents({"y": 1}))
        self.assertEqual(1, await db.test.count_documents({"x": 101}))
        await db.test.replace_one({"x": 101}, {"a": 103}, bypass_document_validation=False)
        self.assertEqual(0, await db.test.count_documents({"x": 101}))
        self.assertEqual(1, await db.test.count_documents({"a": 103}))

        await db.test.insert_one({"y": 1}, bypass_document_validation=True)
        await db_w0.test.replace_one({"y": 1}, {"x": 1}, bypass_document_validation=True)

        async def predicate():
            return await db_w0.test.find_one({"x": 1})

        await async_wait_until(predicate, "find w:0 replaced document")

    async def test_update_bypass_document_validation(self):
        db = self.db
        await db.test.drop()
        await db.test.insert_one({"z": 5})
        await db.command(SON([("collMod", "test"), ("validator", {"z": {"$gte": 0}})]))
        db_w0 = self.db.client.get_database(self.db.name, write_concern=WriteConcern(w=0))

        # Test update_one
        with self.assertRaises(OperationFailure):
            await db.test.update_one({"z": 5}, {"$inc": {"z": -10}})
        self.assertEqual(0, await db.test.count_documents({"z": -5}))
        self.assertEqual(1, await db.test.count_documents({"z": 5}))
        await db.test.update_one({"z": 5}, {"$inc": {"z": -10}}, bypass_document_validation=True)
        self.assertEqual(0, await db.test.count_documents({"z": 5}))
        self.assertEqual(1, await db.test.count_documents({"z": -5}))
        await db.test.update_one({"z": -5}, {"$inc": {"z": 6}}, bypass_document_validation=False)
        self.assertEqual(1, await db.test.count_documents({"z": 1}))
        self.assertEqual(0, await db.test.count_documents({"z": -5}))

        await db.test.insert_one({"z": -10}, bypass_document_validation=True)
        with self.assertRaises(OperationFailure):
            await db.test.update_one({"z": -10}, {"$inc": {"z": 1}})
        self.assertEqual(0, await db.test.count_documents({"z": -9}))
        self.assertEqual(1, await db.test.count_documents({"z": -10}))
        await db.test.update_one({"z": -10}, {"$inc": {"z": 1}}, bypass_document_validation=True)
        self.assertEqual(1, await db.test.count_documents({"z": -9}))
        self.assertEqual(0, await db.test.count_documents({"z": -10}))
        await db.test.update_one({"z": -9}, {"$inc": {"z": 9}}, bypass_document_validation=False)
        self.assertEqual(0, await db.test.count_documents({"z": -9}))
        self.assertEqual(1, await db.test.count_documents({"z": 0}))

        await db.test.insert_one({"y": 1, "x": 0}, bypass_document_validation=True)
        await db_w0.test.update_one({"y": 1}, {"$inc": {"x": 1}}, bypass_document_validation=True)

        async def async_lambda():
            return await db_w0.test.find_one({"y": 1, "x": 1})

        await async_wait_until(async_lambda, "find w:0 updated document")

        # Test update_many
        await db.test.insert_many([{"z": i} for i in range(3, 101)])
        await db.test.insert_one({"y": 0}, bypass_document_validation=True)
        with self.assertRaises(OperationFailure):
            await db.test.update_many({}, {"$inc": {"z": -100}})
        self.assertEqual(100, await db.test.count_documents({"z": {"$gte": 0}}))
        self.assertEqual(0, await db.test.count_documents({"z": {"$lt": 0}}))
        self.assertEqual(0, await db.test.count_documents({"y": 0, "z": -100}))
        await db.test.update_many(
            {"z": {"$gte": 0}}, {"$inc": {"z": -100}}, bypass_document_validation=True
        )
        self.assertEqual(0, await db.test.count_documents({"z": {"$gt": 0}}))
        self.assertEqual(100, await db.test.count_documents({"z": {"$lte": 0}}))
        await db.test.update_many(
            {"z": {"$gt": -50}}, {"$inc": {"z": 100}}, bypass_document_validation=False
        )
        self.assertEqual(50, await db.test.count_documents({"z": {"$gt": 0}}))
        self.assertEqual(50, await db.test.count_documents({"z": {"$lt": 0}}))

        await db.test.insert_many([{"z": -i} for i in range(50)], bypass_document_validation=True)
        with self.assertRaises(OperationFailure):
            await db.test.update_many({}, {"$inc": {"z": 1}})
        self.assertEqual(100, await db.test.count_documents({"z": {"$lte": 0}}))
        self.assertEqual(50, await db.test.count_documents({"z": {"$gt": 1}}))
        await db.test.update_many(
            {"z": {"$gte": 0}}, {"$inc": {"z": -100}}, bypass_document_validation=True
        )
        self.assertEqual(0, await db.test.count_documents({"z": {"$gt": 0}}))
        self.assertEqual(150, await db.test.count_documents({"z": {"$lte": 0}}))
        await db.test.update_many(
            {"z": {"$lte": 0}}, {"$inc": {"z": 100}}, bypass_document_validation=False
        )
        self.assertEqual(150, await db.test.count_documents({"z": {"$gte": 0}}))
        self.assertEqual(0, await db.test.count_documents({"z": {"$lt": 0}}))

        await db.test.insert_one({"m": 1, "x": 0}, bypass_document_validation=True)
        await db.test.insert_one({"m": 1, "x": 0}, bypass_document_validation=True)
        await db_w0.test.update_many({"m": 1}, {"$inc": {"x": 1}}, bypass_document_validation=True)

        async def async_lambda():
            return await db_w0.test.count_documents({"m": 1, "x": 1}) == 2

        await async_wait_until(async_lambda, "find w:0 updated documents")

    async def test_bypass_document_validation_bulk_write(self):
        db = self.db
        await db.test.drop()
        await db.create_collection("test", validator={"a": {"$gte": 0}})
        db_w0 = self.db.client.get_database(self.db.name, write_concern=WriteConcern(w=0))

        ops: list = [
            InsertOne({"a": -10}),
            InsertOne({"a": -11}),
            InsertOne({"a": -12}),
            UpdateOne({"a": {"$lte": -10}}, {"$inc": {"a": 1}}),
            UpdateMany({"a": {"$lte": -10}}, {"$inc": {"a": 1}}),
            ReplaceOne({"a": {"$lte": -10}}, {"a": -1}),
        ]
        await db.test.bulk_write(ops, bypass_document_validation=True)

        self.assertEqual(3, await db.test.count_documents({}))
        self.assertEqual(1, await db.test.count_documents({"a": -11}))
        self.assertEqual(1, await db.test.count_documents({"a": -1}))
        self.assertEqual(1, await db.test.count_documents({"a": -9}))

        # Assert that the operations would fail without bypass_doc_val
        for op in ops:
            with self.assertRaises(BulkWriteError):
                await db.test.bulk_write([op])

        with self.assertRaises(OperationFailure):
            await db_w0.test.bulk_write(ops, bypass_document_validation=True)

    async def test_find_by_default_dct(self):
        db = self.db
        await db.test.insert_one({"foo": "bar"})
        dct = defaultdict(dict, [("foo", "bar")])  # type: ignore[arg-type]
        self.assertIsNotNone(await db.test.find_one(dct))
        self.assertEqual(dct, defaultdict(dict, [("foo", "bar")]))

    async def test_find_w_fields(self):
        db = self.db
        await db.test.delete_many({})

        await db.test.insert_one(
            {"x": 1, "mike": "awesome", "extra thing": "abcdefghijklmnopqrstuvwxyz"}
        )
        self.assertEqual(1, await db.test.count_documents({}))
        doc = await anext(db.test.find({}))
        self.assertIn("x", doc)
        doc = await anext(db.test.find({}))
        self.assertIn("mike", doc)
        doc = await anext(db.test.find({}))
        self.assertIn("extra thing", doc)
        doc = await anext(db.test.find({}, ["x", "mike"]))
        self.assertIn("x", doc)
        doc = await anext(db.test.find({}, ["x", "mike"]))
        self.assertIn("mike", doc)
        doc = await anext(db.test.find({}, ["x", "mike"]))
        self.assertNotIn("extra thing", doc)
        doc = await anext(db.test.find({}, ["mike"]))
        self.assertNotIn("x", doc)
        doc = await anext(db.test.find({}, ["mike"]))
        self.assertIn("mike", doc)
        doc = await anext(db.test.find({}, ["mike"]))
        self.assertNotIn("extra thing", doc)

    @no_type_check
    async def test_fields_specifier_as_dict(self):
        db = self.db
        await db.test.delete_many({})

        await db.test.insert_one({"x": [1, 2, 3], "mike": "awesome"})

        self.assertEqual([1, 2, 3], (await db.test.find_one())["x"])
        self.assertEqual([2, 3], (await db.test.find_one(projection={"x": {"$slice": -2}}))["x"])
        self.assertNotIn("x", await db.test.find_one(projection={"x": 0}))
        self.assertIn("mike", await db.test.find_one(projection={"x": 0}))

    async def test_find_w_regex(self):
        db = self.db
        await db.test.delete_many({})

        await db.test.insert_one({"x": "hello_world"})
        await db.test.insert_one({"x": "hello_mike"})
        await db.test.insert_one({"x": "hello_mikey"})
        await db.test.insert_one({"x": "hello_test"})

        self.assertEqual(len(await db.test.find().to_list()), 4)
        self.assertEqual(len(await db.test.find({"x": re.compile("^hello.*")}).to_list()), 4)
        self.assertEqual(len(await db.test.find({"x": re.compile("ello")}).to_list()), 4)
        self.assertEqual(len(await db.test.find({"x": re.compile("^hello$")}).to_list()), 0)
        self.assertEqual(len(await db.test.find({"x": re.compile("^hello_mi.*$")}).to_list()), 2)

    async def test_id_can_be_anything(self):
        db = self.db

        await db.test.delete_many({})
        auto_id = {"hello": "world"}
        await db.test.insert_one(auto_id)
        self.assertIsInstance(auto_id["_id"], ObjectId)

        numeric = {"_id": 240, "hello": "world"}
        await db.test.insert_one(numeric)
        self.assertEqual(numeric["_id"], 240)

        obj = {"_id": numeric, "hello": "world"}
        await db.test.insert_one(obj)
        self.assertEqual(obj["_id"], numeric)

        async for x in db.test.find():
            self.assertEqual(x["hello"], "world")
            self.assertIn("_id", x)

    async def test_unique_index(self):
        db = self.db
        await db.drop_collection("test")
        await db.test.create_index("hello")

        # No error.
        await db.test.insert_one({"hello": "world"})
        await db.test.insert_one({"hello": "world"})

        await db.drop_collection("test")
        await db.test.create_index("hello", unique=True)

        with self.assertRaises(DuplicateKeyError):
            await db.test.insert_one({"hello": "world"})
            await db.test.insert_one({"hello": "world"})

    async def test_duplicate_key_error(self):
        db = self.db
        await db.drop_collection("test")

        await db.test.create_index("x", unique=True)

        await db.test.insert_one({"_id": 1, "x": 1})

        with self.assertRaises(DuplicateKeyError) as context:
            await db.test.insert_one({"x": 1})

        self.assertIsNotNone(context.exception.details)

        with self.assertRaises(DuplicateKeyError) as context:
            await db.test.insert_one({"x": 1})

        self.assertIsNotNone(context.exception.details)
        self.assertEqual(1, await db.test.count_documents({}))

    async def test_write_error_text_handling(self):
        db = self.db
        await db.drop_collection("test")

        await db.test.create_index("text", unique=True)

        # Test workaround for SERVER-24007
        data = (
            b"a\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
            b"\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83\xe2\x98\x83"
        )

        text = utf_8_decode(data, None, True)
        await db.test.insert_one({"text": text})

        # Should raise DuplicateKeyError, not InvalidBSON
        with self.assertRaises(DuplicateKeyError):
            await db.test.insert_one({"text": text})

        with self.assertRaises(DuplicateKeyError):
            await db.test.replace_one({"_id": ObjectId()}, {"text": text}, upsert=True)

        # Should raise BulkWriteError, not InvalidBSON
        with self.assertRaises(BulkWriteError):
            await db.test.insert_many([{"text": text}])

    async def test_write_error_unicode(self):
        coll = self.db.test
        self.addAsyncCleanup(coll.drop)

        await coll.create_index("a", unique=True)
        await coll.insert_one({"a": "unicode \U0001f40d"})
        with self.assertRaisesRegex(DuplicateKeyError, "E11000 duplicate key error") as ctx:
            await coll.insert_one({"a": "unicode \U0001f40d"})

        # Once more for good measure.
        self.assertIn("E11000 duplicate key error", str(ctx.exception))

    async def test_wtimeout(self):
        # Ensure setting wtimeout doesn't disable write concern altogether.
        # See SERVER-12596.
        collection = self.db.test
        await collection.drop()
        await collection.insert_one({"_id": 1})

        coll = collection.with_options(write_concern=WriteConcern(w=1, wtimeout=1000))
        with self.assertRaises(DuplicateKeyError):
            await coll.insert_one({"_id": 1})

        coll = collection.with_options(write_concern=WriteConcern(wtimeout=1000))
        with self.assertRaises(DuplicateKeyError):
            await coll.insert_one({"_id": 1})

    async def test_error_code(self):
        try:
            await self.db.test.update_many({}, {"$thismodifierdoesntexist": 1})
        except OperationFailure as exc:
            self.assertIn(exc.code, (9, 10147, 16840, 17009))
            # Just check that we set the error document. Fields
            # vary by MongoDB version.
            self.assertIsNotNone(exc.details)
        else:
            self.fail("OperationFailure was not raised")

    async def test_index_on_subfield(self):
        db = self.db
        await db.drop_collection("test")

        await db.test.insert_one({"hello": {"a": 4, "b": 5}})
        await db.test.insert_one({"hello": {"a": 7, "b": 2}})
        await db.test.insert_one({"hello": {"a": 4, "b": 10}})

        await db.drop_collection("test")
        await db.test.create_index("hello.a", unique=True)

        await db.test.insert_one({"hello": {"a": 4, "b": 5}})
        await db.test.insert_one({"hello": {"a": 7, "b": 2}})
        with self.assertRaises(DuplicateKeyError):
            await db.test.insert_one({"hello": {"a": 4, "b": 10}})

    async def test_replace_one(self):
        db = self.db
        await db.drop_collection("test")

        with self.assertRaises(ValueError):
            await db.test.replace_one({}, {"$set": {"x": 1}})

        id1 = (await db.test.insert_one({"x": 1})).inserted_id
        result = await db.test.replace_one({"x": 1}, {"y": 1})
        self.assertIsInstance(result, UpdateResult)
        self.assertEqual(1, result.matched_count)
        self.assertIn(result.modified_count, (None, 1))
        self.assertIsNone(result.upserted_id)
        self.assertTrue(result.acknowledged)
        self.assertEqual(1, await db.test.count_documents({"y": 1}))
        self.assertEqual(0, await db.test.count_documents({"x": 1}))
        self.assertEqual((await db.test.find_one(id1))["y"], 1)  # type: ignore

        replacement = RawBSONDocument(encode({"_id": id1, "z": 1}))
        result = await db.test.replace_one({"y": 1}, replacement, True)
        self.assertIsInstance(result, UpdateResult)
        self.assertEqual(1, result.matched_count)
        self.assertIn(result.modified_count, (None, 1))
        self.assertIsNone(result.upserted_id)
        self.assertTrue(result.acknowledged)
        self.assertEqual(1, await db.test.count_documents({"z": 1}))
        self.assertEqual(0, await db.test.count_documents({"y": 1}))
        self.assertEqual((await db.test.find_one(id1))["z"], 1)  # type: ignore

        result = await db.test.replace_one({"x": 2}, {"y": 2}, True)
        self.assertIsInstance(result, UpdateResult)
        self.assertEqual(0, result.matched_count)
        self.assertIn(result.modified_count, (None, 0))
        self.assertIsInstance(result.upserted_id, ObjectId)
        self.assertTrue(result.acknowledged)
        self.assertEqual(1, await db.test.count_documents({"y": 2}))

        db = db.client.get_database(db.name, write_concern=WriteConcern(w=0))
        result = await db.test.replace_one({"x": 0}, {"y": 0})
        self.assertIsInstance(result, UpdateResult)
        self.assertRaises(InvalidOperation, lambda: result.matched_count)
        self.assertRaises(InvalidOperation, lambda: result.modified_count)
        self.assertRaises(InvalidOperation, lambda: result.upserted_id)
        self.assertFalse(result.acknowledged)

    async def test_update_one(self):
        db = self.db
        await db.drop_collection("test")

        with self.assertRaises(ValueError):
            await db.test.update_one({}, {"x": 1})

        id1 = (await db.test.insert_one({"x": 5})).inserted_id
        result = await db.test.update_one({}, {"$inc": {"x": 1}})
        self.assertIsInstance(result, UpdateResult)
        self.assertEqual(1, result.matched_count)
        self.assertIn(result.modified_count, (None, 1))
        self.assertIsNone(result.upserted_id)
        self.assertTrue(result.acknowledged)
        self.assertEqual((await db.test.find_one(id1))["x"], 6)  # type: ignore

        id2 = (await db.test.insert_one({"x": 1})).inserted_id
        result = await db.test.update_one({"x": 6}, {"$inc": {"x": 1}})
        self.assertIsInstance(result, UpdateResult)
        self.assertEqual(1, result.matched_count)
        self.assertIn(result.modified_count, (None, 1))
        self.assertIsNone(result.upserted_id)
        self.assertTrue(result.acknowledged)
        self.assertEqual((await db.test.find_one(id1))["x"], 7)  # type: ignore
        self.assertEqual((await db.test.find_one(id2))["x"], 1)  # type: ignore

        result = await db.test.update_one({"x": 2}, {"$set": {"y": 1}}, True)
        self.assertIsInstance(result, UpdateResult)
        self.assertEqual(0, result.matched_count)
        self.assertIn(result.modified_count, (None, 0))
        self.assertIsInstance(result.upserted_id, ObjectId)
        self.assertTrue(result.acknowledged)

        db = db.client.get_database(db.name, write_concern=WriteConcern(w=0))
        result = await db.test.update_one({"x": 0}, {"$inc": {"x": 1}})
        self.assertIsInstance(result, UpdateResult)
        self.assertRaises(InvalidOperation, lambda: result.matched_count)
        self.assertRaises(InvalidOperation, lambda: result.modified_count)
        self.assertRaises(InvalidOperation, lambda: result.upserted_id)
        self.assertFalse(result.acknowledged)

    async def test_update_result(self):
        db = self.db
        await db.drop_collection("test")

        result = await db.test.update_one({"x": 0}, {"$inc": {"x": 1}}, upsert=True)
        self.assertEqual(result.did_upsert, True)

        result = await db.test.update_one({"_id": None, "x": 0}, {"$inc": {"x": 1}}, upsert=True)
        self.assertEqual(result.did_upsert, True)

        result = await db.test.update_one({"_id": None}, {"$inc": {"x": 1}})
        self.assertEqual(result.did_upsert, False)

    async def test_update_many(self):
        db = self.db
        await db.drop_collection("test")

        with self.assertRaises(ValueError):
            await db.test.update_many({}, {"x": 1})

        await db.test.insert_one({"x": 4, "y": 3})
        await db.test.insert_one({"x": 5, "y": 5})
        await db.test.insert_one({"x": 4, "y": 4})

        result = await db.test.update_many({"x": 4}, {"$set": {"y": 5}})
        self.assertIsInstance(result, UpdateResult)
        self.assertEqual(2, result.matched_count)
        self.assertIn(result.modified_count, (None, 2))
        self.assertIsNone(result.upserted_id)
        self.assertTrue(result.acknowledged)
        self.assertEqual(3, await db.test.count_documents({"y": 5}))

        result = await db.test.update_many({"x": 5}, {"$set": {"y": 6}})
        self.assertIsInstance(result, UpdateResult)
        self.assertEqual(1, result.matched_count)
        self.assertIn(result.modified_count, (None, 1))
        self.assertIsNone(result.upserted_id)
        self.assertTrue(result.acknowledged)
        self.assertEqual(1, await db.test.count_documents({"y": 6}))

        result = await db.test.update_many({"x": 2}, {"$set": {"y": 1}}, True)
        self.assertIsInstance(result, UpdateResult)
        self.assertEqual(0, result.matched_count)
        self.assertIn(result.modified_count, (None, 0))
        self.assertIsInstance(result.upserted_id, ObjectId)
        self.assertTrue(result.acknowledged)

        db = db.client.get_database(db.name, write_concern=WriteConcern(w=0))
        result = await db.test.update_many({"x": 0}, {"$inc": {"x": 1}})
        self.assertIsInstance(result, UpdateResult)
        self.assertRaises(InvalidOperation, lambda: result.matched_count)
        self.assertRaises(InvalidOperation, lambda: result.modified_count)
        self.assertRaises(InvalidOperation, lambda: result.upserted_id)
        self.assertFalse(result.acknowledged)

    async def test_update_check_keys(self):
        await self.db.drop_collection("test")
        self.assertTrue(await self.db.test.insert_one({"hello": "world"}))

        # Modify shouldn't check keys...
        self.assertTrue(
            await self.db.test.update_one(
                {"hello": "world"}, {"$set": {"foo.bar": "baz"}}, upsert=True
            )
        )

        # I know this seems like testing the server but I'd like to be notified
        # by CI if the server's behavior changes here.
        doc = SON([("$set", {"foo.bar": "bim"}), ("hello", "world")])
        with self.assertRaises(OperationFailure):
            await self.db.test.update_one({"hello": "world"}, doc, upsert=True)

        # This is going to cause keys to be checked and raise InvalidDocument.
        # That's OK assuming the server's behavior in the previous assert
        # doesn't change. If the behavior changes checking the first key for
        # '$' in update won't be good enough anymore.
        doc = SON([("hello", "world"), ("$set", {"foo.bar": "bim"})])
        with self.assertRaises(OperationFailure):
            await self.db.test.replace_one({"hello": "world"}, doc, upsert=True)

        # Replace with empty document
        self.assertNotEqual(
            0, (await self.db.test.replace_one({"hello": "world"}, {})).matched_count
        )

    async def test_acknowledged_delete(self):
        db = self.db
        await db.drop_collection("test")
        await db.test.insert_many([{"x": 1}, {"x": 1}])
        self.assertEqual(2, (await db.test.delete_many({})).deleted_count)
        self.assertEqual(0, (await db.test.delete_many({})).deleted_count)

    @async_client_context.require_version_max(4, 9)
    async def test_manual_last_error(self):
        coll = self.db.get_collection("test", write_concern=WriteConcern(w=0))
        await coll.insert_one({"x": 1})
        await self.db.command("getlasterror", w=1, wtimeout=1)

    async def test_count_documents(self):
        db = self.db
        await db.drop_collection("test")
        self.addAsyncCleanup(db.drop_collection, "test")

        self.assertEqual(await db.test.count_documents({}), 0)
        await db.wrong.insert_many([{}, {}])
        self.assertEqual(await db.test.count_documents({}), 0)
        await db.test.insert_many([{}, {}])
        self.assertEqual(await db.test.count_documents({}), 2)
        await db.test.insert_many([{"foo": "bar"}, {"foo": "baz"}])
        self.assertEqual(await db.test.count_documents({"foo": "bar"}), 1)
        self.assertEqual(await db.test.count_documents({"foo": re.compile(r"ba.*")}), 2)

    async def test_estimated_document_count(self):
        db = self.db
        await db.drop_collection("test")
        self.addAsyncCleanup(db.drop_collection, "test")

        self.assertEqual(await db.test.estimated_document_count(), 0)
        await db.wrong.insert_many([{}, {}])
        self.assertEqual(await db.test.estimated_document_count(), 0)
        await db.test.insert_many([{}, {}])
        self.assertEqual(await db.test.estimated_document_count(), 2)

    async def test_aggregate(self):
        db = self.db
        await db.drop_collection("test")
        await db.test.insert_one({"foo": [1, 2]})

        with self.assertRaises(TypeError):
            await db.test.aggregate("wow")  # type: ignore[arg-type]

        pipeline = {"$project": {"_id": False, "foo": True}}
        result = await db.test.aggregate([pipeline])
        self.assertIsInstance(result, AsyncCommandCursor)
        self.assertEqual([{"foo": [1, 2]}], await result.to_list())

        # Test write concern.
        with self.write_concern_collection() as coll:
            await coll.aggregate([{"$out": "output-collection"}])

    async def test_aggregate_raw_bson(self):
        db = self.db
        await db.drop_collection("test")
        await db.test.insert_one({"foo": [1, 2]})

        with self.assertRaises(TypeError):
            await db.test.aggregate("wow")  # type: ignore[arg-type]

        pipeline = {"$project": {"_id": False, "foo": True}}
        coll = db.get_collection("test", codec_options=CodecOptions(document_class=RawBSONDocument))
        result = await coll.aggregate([pipeline])
        self.assertIsInstance(result, AsyncCommandCursor)
        first_result = await anext(result)
        self.assertIsInstance(first_result, RawBSONDocument)
        self.assertEqual([1, 2], list(first_result["foo"]))

    async def test_aggregation_cursor_validation(self):
        db = self.db
        projection = {"$project": {"_id": "$_id"}}
        cursor = await db.test.aggregate([projection], cursor={})
        self.assertIsInstance(cursor, AsyncCommandCursor)

    async def test_aggregation_cursor(self):
        db = self.db
        if await async_client_context.has_secondaries:
            # Test that getMore messages are sent to the right server.
            db = self.client.get_database(
                db.name,
                read_preference=ReadPreference.SECONDARY,
                write_concern=WriteConcern(w=self.w),
            )

        for collection_size in (10, 1000):
            await db.drop_collection("test")
            await db.test.insert_many([{"_id": i} for i in range(collection_size)])
            expected_sum = sum(range(collection_size))
            # Use batchSize to ensure multiple getMore messages
            cursor = await db.test.aggregate([{"$project": {"_id": "$_id"}}], batchSize=5)

            self.assertEqual(expected_sum, sum(doc["_id"] for doc in await cursor.to_list()))

        # Test that batchSize is handled properly.
        cursor = await db.test.aggregate([], batchSize=5)
        self.assertEqual(5, len(cursor._data))
        # Force a getMore
        cursor._data.clear()
        await anext(cursor)
        # batchSize - 1
        self.assertEqual(4, len(cursor._data))
        # Exhaust the cursor. There shouldn't be any errors.
        async for _doc in cursor:
            pass

    async def test_aggregation_cursor_alive(self):
        await self.db.test.delete_many({})
        await self.db.test.insert_many([{} for _ in range(3)])
        self.addAsyncCleanup(self.db.test.delete_many, {})
        cursor = await self.db.test.aggregate(pipeline=[], cursor={"batchSize": 2})
        n = 0
        while True:
            await cursor.next()
            n += 1
            if n == 3:
                self.assertFalse(cursor.alive)
                break

            self.assertTrue(cursor.alive)

    async def test_invalid_session_parameter(self):
        async def try_invalid_session():
            with await self.db.test.aggregate([], {}):  # type:ignore
                pass

        with self.assertRaisesRegex(ValueError, "must be an AsyncClientSession"):
            await try_invalid_session()

    async def test_large_limit(self):
        db = self.db
        await db.drop_collection("test_large_limit")
        await db.test_large_limit.create_index([("x", 1)])
        my_str = "mongomongo" * 1000

        await db.test_large_limit.insert_many({"x": i, "y": my_str} for i in range(2000))

        i = 0
        y = 0
        async for doc in db.test_large_limit.find(limit=1900).sort([("x", 1)]):
            i += 1
            y += doc["x"]

        self.assertEqual(1900, i)
        self.assertEqual((1900 * 1899) / 2, y)

    async def test_find_kwargs(self):
        db = self.db
        await db.drop_collection("test")
        await db.test.insert_many({"x": i} for i in range(10))

        self.assertEqual(10, await db.test.count_documents({}))

        total = 0
        async for x in db.test.find({}, skip=4, limit=2):
            total += x["x"]

        self.assertEqual(9, total)

    async def test_rename(self):
        db = self.db
        await db.drop_collection("test")
        await db.drop_collection("foo")

        with self.assertRaises(TypeError):
            await db.test.rename(5)  # type: ignore[arg-type]
        with self.assertRaises(InvalidName):
            await db.test.rename("")
        with self.assertRaises(InvalidName):
            await db.test.rename("te$t")
        with self.assertRaises(InvalidName):
            await db.test.rename(".test")
        with self.assertRaises(InvalidName):
            await db.test.rename("test.")
        with self.assertRaises(InvalidName):
            await db.test.rename("tes..t")

        self.assertEqual(0, await db.test.count_documents({}))
        self.assertEqual(0, await db.foo.count_documents({}))

        await db.test.insert_many({"x": i} for i in range(10))

        self.assertEqual(10, await db.test.count_documents({}))

        await db.test.rename("foo")

        self.assertEqual(0, await db.test.count_documents({}))
        self.assertEqual(10, await db.foo.count_documents({}))

        x = 0
        async for doc in db.foo.find():
            self.assertEqual(x, doc["x"])
            x += 1

        await db.test.insert_one({})
        with self.assertRaises(OperationFailure):
            await db.foo.rename("test")
        await db.foo.rename("test", dropTarget=True)

        with self.write_concern_collection() as coll:
            await coll.rename("foo")

    @no_type_check
    async def test_find_one(self):
        db = self.db
        await db.drop_collection("test")

        _id = (await db.test.insert_one({"hello": "world", "foo": "bar"})).inserted_id

        self.assertEqual("world", (await db.test.find_one())["hello"])
        self.assertEqual(await db.test.find_one(_id), await db.test.find_one())
        self.assertEqual(await db.test.find_one(None), await db.test.find_one())
        self.assertEqual(await db.test.find_one({}), await db.test.find_one())
        self.assertEqual(await db.test.find_one({"hello": "world"}), await db.test.find_one())

        self.assertIn("hello", await db.test.find_one(projection=["hello"]))
        self.assertNotIn("hello", await db.test.find_one(projection=["foo"]))

        self.assertIn("hello", await db.test.find_one(projection=("hello",)))
        self.assertNotIn("hello", await db.test.find_one(projection=("foo",)))

        self.assertIn("hello", await db.test.find_one(projection={"hello"}))
        self.assertNotIn("hello", await db.test.find_one(projection={"foo"}))

        self.assertIn("hello", await db.test.find_one(projection=frozenset(["hello"])))
        self.assertNotIn("hello", await db.test.find_one(projection=frozenset(["foo"])))

        self.assertEqual(["_id"], list(await db.test.find_one(projection={"_id": True})))
        self.assertIn("hello", list(await db.test.find_one(projection={})))
        self.assertIn("hello", list(await db.test.find_one(projection=[])))

        self.assertEqual(None, await db.test.find_one({"hello": "foo"}))
        self.assertEqual(None, await db.test.find_one(ObjectId()))

    async def test_find_one_non_objectid(self):
        db = self.db
        await db.drop_collection("test")

        await db.test.insert_one({"_id": 5})

        self.assertTrue(await db.test.find_one(5))
        self.assertFalse(await db.test.find_one(6))

    async def test_find_one_with_find_args(self):
        db = self.db
        await db.drop_collection("test")

        await db.test.insert_many([{"x": i} for i in range(1, 4)])

        self.assertEqual(1, (await db.test.find_one())["x"])
        self.assertEqual(2, (await db.test.find_one(skip=1, limit=2))["x"])

    async def test_find_with_sort(self):
        db = self.db
        await db.drop_collection("test")

        await db.test.insert_many([{"x": 2}, {"x": 1}, {"x": 3}])

        self.assertEqual(2, (await db.test.find_one())["x"])
        self.assertEqual(1, (await db.test.find_one(sort=[("x", 1)]))["x"])
        self.assertEqual(3, (await db.test.find_one(sort=[("x", -1)]))["x"])

        async def to_list(things):
            return [thing["x"] async for thing in things]

        self.assertEqual([2, 1, 3], await to_list(db.test.find()))
        self.assertEqual([1, 2, 3], await to_list(db.test.find(sort=[("x", 1)])))
        self.assertEqual([3, 2, 1], await to_list(db.test.find(sort=[("x", -1)])))

        with self.assertRaises(TypeError):
            await db.test.find(sort=5)
        with self.assertRaises(TypeError):
            await db.test.find(sort="hello")
        with self.assertRaises(TypeError):
            await db.test.find(sort=["hello", 1])

    # TODO doesn't actually test functionality, just that it doesn't blow up
    async def test_cursor_timeout(self):
        await self.db.test.find(no_cursor_timeout=True).to_list()
        await self.db.test.find(no_cursor_timeout=False).to_list()

    async def test_exhaust(self):
        if await async_is_mongos(self.db.client):
            with self.assertRaises(InvalidOperation):
                await anext(self.db.test.find(cursor_type=CursorType.EXHAUST))
            return

        # Limit is incompatible with exhaust.
        with self.assertRaises(InvalidOperation):
            await anext(self.db.test.find(cursor_type=CursorType.EXHAUST, limit=5))
        cur = self.db.test.find(cursor_type=CursorType.EXHAUST)
        with self.assertRaises(InvalidOperation):
            cur.limit(5)
            await cur.next()
        cur = self.db.test.find(limit=5)
        with self.assertRaises(InvalidOperation):
            await cur.add_option(64)
        cur = self.db.test.find()
        await cur.add_option(64)
        with self.assertRaises(InvalidOperation):
            cur.limit(5)

        await self.db.drop_collection("test")
        # Insert enough documents to require more than one batch
        await self.db.test.insert_many([{"i": i} for i in range(150)])

        client = await self.async_rs_or_single_client(maxPoolSize=1)
        pool = await async_get_pool(client)

        # Make sure the socket is returned after exhaustion.
        cur = client[self.db.name].test.find(cursor_type=CursorType.EXHAUST)
        await anext(cur)
        self.assertEqual(0, len(pool.conns))
        async for _ in cur:
            pass
        self.assertEqual(1, len(pool.conns))

        # Same as previous but don't call next()
        async for _ in client[self.db.name].test.find(cursor_type=CursorType.EXHAUST):
            pass
        self.assertEqual(1, len(pool.conns))

        # If the Cursor instance is discarded before being completely iterated
        # and the socket has pending data (more_to_come=True) we have to close
        # and discard the socket.
        cur = client[self.db.name].test.find(cursor_type=CursorType.EXHAUST, batch_size=2)
        if async_client_context.version.at_least(4, 2):
            # On 4.2+ we use OP_MSG which only sets more_to_come=True after the
            # first getMore.
            for _ in range(3):
                await anext(cur)
        else:
            await anext(cur)
        self.assertEqual(0, len(pool.conns))
        # if sys.platform.startswith("java") or "PyPy" in sys.version:
        #     # Don't wait for GC or use gc.collect(), it's unreliable.
        await cur.close()
        cur = None
        # Wait until the background thread returns the socket.
        await async_wait_until(lambda: pool.active_sockets == 0, "return socket")
        # The socket should be discarded.
        self.assertEqual(0, len(pool.conns))

    async def test_distinct(self):
        await self.db.drop_collection("test")

        test = self.db.test
        await test.insert_many([{"a": 1}, {"a": 2}, {"a": 2}, {"a": 2}, {"a": 3}])

        distinct = await test.distinct("a")
        distinct.sort()

        self.assertEqual([1, 2, 3], distinct)

        distinct = await test.find({"a": {"$gt": 1}}).distinct("a")
        distinct.sort()
        self.assertEqual([2, 3], distinct)

        distinct = await test.distinct("a", {"a": {"$gt": 1}})
        distinct.sort()
        self.assertEqual([2, 3], distinct)

        await self.db.drop_collection("test")

        await test.insert_one({"a": {"b": "a"}, "c": 12})
        await test.insert_one({"a": {"b": "b"}, "c": 12})
        await test.insert_one({"a": {"b": "c"}, "c": 12})
        await test.insert_one({"a": {"b": "c"}, "c": 12})

        distinct = await test.distinct("a.b")
        distinct.sort()

        self.assertEqual(["a", "b", "c"], distinct)

    async def test_query_on_query_field(self):
        await self.db.drop_collection("test")
        await self.db.test.insert_one({"query": "foo"})
        await self.db.test.insert_one({"bar": "foo"})

        self.assertEqual(1, await self.db.test.count_documents({"query": {"$ne": None}}))
        self.assertEqual(1, len(await self.db.test.find({"query": {"$ne": None}}).to_list()))

    async def test_min_query(self):
        await self.db.drop_collection("test")
        await self.db.test.insert_many([{"x": 1}, {"x": 2}])
        await self.db.test.create_index("x")

        cursor = self.db.test.find({"$min": {"x": 2}, "$query": {}}, hint="x_1")

        docs = await cursor.to_list()
        self.assertEqual(1, len(docs))
        self.assertEqual(2, docs[0]["x"])

    async def test_numerous_inserts(self):
        # Ensure we don't exceed server's maxWriteBatchSize size limit.
        await self.db.test.drop()
        n_docs = await async_client_context.max_write_batch_size + 100
        await self.db.test.insert_many([{} for _ in range(n_docs)])
        self.assertEqual(n_docs, await self.db.test.count_documents({}))
        await self.db.test.drop()

    async def test_insert_many_large_batch(self):
        # Tests legacy insert.
        db = self.client.test_insert_large_batch
        self.addAsyncCleanup(self.client.drop_database, "test_insert_large_batch")
        max_bson_size = await async_client_context.max_bson_size
        # Write commands are limited to 16MB + 16k per batch
        big_string = "x" * int(max_bson_size / 2)

        # Batch insert that requires 2 batches.
        successful_insert = [
            {"x": big_string},
            {"x": big_string},
            {"x": big_string},
            {"x": big_string},
        ]
        await db.collection_0.insert_many(successful_insert)
        self.assertEqual(4, await db.collection_0.count_documents({}))

        await db.collection_0.drop()

        # Test that inserts fail after first error.
        insert_second_fails = [
            {"_id": "id0", "x": big_string},
            {"_id": "id0", "x": big_string},
            {"_id": "id1", "x": big_string},
            {"_id": "id2", "x": big_string},
        ]

        with self.assertRaises(BulkWriteError):
            await db.collection_1.insert_many(insert_second_fails)

        self.assertEqual(1, await db.collection_1.count_documents({}))

        await db.collection_1.drop()

        # 2 batches, 2nd insert fails, unacknowledged, ordered.
        unack_coll = db.collection_2.with_options(write_concern=WriteConcern(w=0))
        await unack_coll.insert_many(insert_second_fails)

        async def async_lambda():
            return await db.collection_2.count_documents({}) == 1

        await async_wait_until(async_lambda, "insert 1 document", timeout=60)

        await db.collection_2.drop()

        # 2 batches, ids of docs 0 and 1 are dupes, ids of docs 2 and 3 are
        # dupes. Acknowledged, unordered.
        insert_two_failures = [
            {"_id": "id0", "x": big_string},
            {"_id": "id0", "x": big_string},
            {"_id": "id1", "x": big_string},
            {"_id": "id1", "x": big_string},
        ]

        with self.assertRaises(OperationFailure) as context:
            await db.collection_3.insert_many(insert_two_failures, ordered=False)

        self.assertIn("id1", str(context.exception))

        # Only the first and third documents should be inserted.
        self.assertEqual(2, await db.collection_3.count_documents({}))

        await db.collection_3.drop()

        # 2 batches, 2 errors, unacknowledged, unordered.
        unack_coll = db.collection_4.with_options(write_concern=WriteConcern(w=0))
        await unack_coll.insert_many(insert_two_failures, ordered=False)

        async def async_lambda():
            return await db.collection_4.count_documents({}) == 2

        # Only the first and third documents are inserted.
        await async_wait_until(async_lambda, "insert 2 documents", timeout=60)

        await db.collection_4.drop()

    async def test_messages_with_unicode_collection_names(self):
        db = self.db

        await db["Employés"].insert_one({"x": 1})
        await db["Employés"].replace_one({"x": 1}, {"x": 2})
        await db["Employés"].delete_many({})
        await db["Employés"].find_one()
        await db["Employés"].find().to_list()

    async def test_drop_indexes_non_existent(self):
        await self.db.drop_collection("test")
        await self.db.test.drop_indexes()

    # This is really a bson test but easier to just reproduce it here...
    # (Shame on me)
    async def test_bad_encode(self):
        c = self.db.test
        await c.drop()
        with self.assertRaises(InvalidDocument):
            await c.insert_one({"x": c})

        class BadGetAttr(dict):
            def __getattr__(self, name):
                pass

        bad = BadGetAttr([("foo", "bar")])
        await c.insert_one({"bad": bad})
        self.assertEqual("bar", (await c.find_one())["bad"]["foo"])  # type: ignore

    async def test_array_filters_validation(self):
        # array_filters must be a list.
        c = self.db.test
        with self.assertRaises(TypeError):
            await c.update_one({}, {"$set": {"a": 1}}, array_filters={})  # type: ignore[arg-type]
        with self.assertRaises(TypeError):
            await c.update_many({}, {"$set": {"a": 1}}, array_filters={})  # type: ignore[arg-type]
        with self.assertRaises(TypeError):
            update = {"$set": {"a": 1}}
            await c.find_one_and_update({}, update, array_filters={})  # type: ignore[arg-type]

    async def test_array_filters_unacknowledged(self):
        c_w0 = self.db.test.with_options(write_concern=WriteConcern(w=0))
        with self.assertRaises(ConfigurationError):
            await c_w0.update_one({}, {"$set": {"y.$[i].b": 5}}, array_filters=[{"i.b": 1}])
        with self.assertRaises(ConfigurationError):
            await c_w0.update_many({}, {"$set": {"y.$[i].b": 5}}, array_filters=[{"i.b": 1}])
        with self.assertRaises(ConfigurationError):
            await c_w0.find_one_and_update(
                {}, {"$set": {"y.$[i].b": 5}}, array_filters=[{"i.b": 1}]
            )

    async def test_find_one_and(self):
        c = self.db.test
        await c.drop()
        await c.insert_one({"_id": 1, "i": 1})

        self.assertEqual(
            {"_id": 1, "i": 1}, await c.find_one_and_update({"_id": 1}, {"$inc": {"i": 1}})
        )
        self.assertEqual(
            {"_id": 1, "i": 3},
            await c.find_one_and_update(
                {"_id": 1}, {"$inc": {"i": 1}}, return_document=ReturnDocument.AFTER
            ),
        )

        self.assertEqual({"_id": 1, "i": 3}, await c.find_one_and_delete({"_id": 1}))
        self.assertEqual(None, await c.find_one({"_id": 1}))

        self.assertEqual(None, await c.find_one_and_update({"_id": 1}, {"$inc": {"i": 1}}))
        self.assertEqual(
            {"_id": 1, "i": 1},
            await c.find_one_and_update(
                {"_id": 1}, {"$inc": {"i": 1}}, return_document=ReturnDocument.AFTER, upsert=True
            ),
        )
        self.assertEqual(
            {"_id": 1, "i": 2},
            await c.find_one_and_update(
                {"_id": 1}, {"$inc": {"i": 1}}, return_document=ReturnDocument.AFTER
            ),
        )

        self.assertEqual(
            {"_id": 1, "i": 3},
            await c.find_one_and_replace(
                {"_id": 1}, {"i": 3, "j": 1}, projection=["i"], return_document=ReturnDocument.AFTER
            ),
        )
        self.assertEqual(
            {"i": 4},
            await c.find_one_and_update(
                {"_id": 1},
                {"$inc": {"i": 1}},
                projection={"i": 1, "_id": 0},
                return_document=ReturnDocument.AFTER,
            ),
        )

        await c.drop()
        for j in range(5):
            await c.insert_one({"j": j, "i": 0})

        sort = [("j", DESCENDING)]
        self.assertEqual(4, (await c.find_one_and_update({}, {"$inc": {"i": 1}}, sort=sort))["j"])

    async def test_find_one_and_write_concern(self):
        listener = OvertCommandListener()
        db = (await self.async_single_client(event_listeners=[listener]))[self.db.name]
        # non-default WriteConcern.
        c_w0 = db.get_collection("test", write_concern=WriteConcern(w=0))
        # default WriteConcern.
        c_default = db.get_collection("test", write_concern=WriteConcern())
        # Authenticate the client and throw out auth commands from the listener.
        await db.command("ping")
        listener.reset()
        await c_w0.find_one_and_update({"_id": 1}, {"$set": {"foo": "bar"}})
        self.assertEqual({"w": 0}, listener.started_events[0].command["writeConcern"])
        listener.reset()

        await c_w0.find_one_and_replace({"_id": 1}, {"foo": "bar"})
        self.assertEqual({"w": 0}, listener.started_events[0].command["writeConcern"])
        listener.reset()

        await c_w0.find_one_and_delete({"_id": 1})
        self.assertEqual({"w": 0}, listener.started_events[0].command["writeConcern"])
        listener.reset()

        # Test write concern errors.
        if async_client_context.is_rs:
            c_wc_error = db.get_collection(
                "test", write_concern=WriteConcern(w=len(async_client_context.nodes) + 1)
            )
            with self.assertRaises(WriteConcernError):
                await c_wc_error.find_one_and_update({"_id": 1}, {"$set": {"foo": "bar"}})
            with self.assertRaises(WriteConcernError):
                await c_wc_error.find_one_and_replace(
                    {"w": 0}, listener.started_events[0].command["writeConcern"]
                )
            with self.assertRaises(WriteConcernError):
                await c_wc_error.find_one_and_delete(
                    {"w": 0}, listener.started_events[0].command["writeConcern"]
                )
            listener.reset()

        await c_default.find_one_and_update({"_id": 1}, {"$set": {"foo": "bar"}})
        self.assertNotIn("writeConcern", listener.started_events[0].command)
        listener.reset()

        await c_default.find_one_and_replace({"_id": 1}, {"foo": "bar"})
        self.assertNotIn("writeConcern", listener.started_events[0].command)
        listener.reset()

        await c_default.find_one_and_delete({"_id": 1})
        self.assertNotIn("writeConcern", listener.started_events[0].command)
        listener.reset()

    async def test_find_with_nested(self):
        c = self.db.test
        await c.drop()
        await c.insert_many([{"i": i} for i in range(5)])  # [0, 1, 2, 3, 4]
        self.assertEqual(
            [2],
            [
                i["i"]
                async for i in c.find(
                    {
                        "$and": [
                            {
                                # This clause gives us [1,2,4]
                                "$or": [
                                    {"i": {"$lte": 2}},
                                    {"i": {"$gt": 3}},
                                ],
                            },
                            {
                                # This clause gives us [2,3]
                                "$or": [
                                    {"i": 2},
                                    {"i": 3},
                                ]
                            },
                        ]
                    }
                )
            ],
        )

        self.assertEqual(
            [0, 1, 2],
            [
                i["i"]
                async for i in c.find(
                    {
                        "$or": [
                            {
                                # This clause gives us [2]
                                "$and": [
                                    {"i": {"$gte": 2}},
                                    {"i": {"$lt": 3}},
                                ],
                            },
                            {
                                # This clause gives us [0,1]
                                "$and": [
                                    {"i": {"$gt": -100}},
                                    {"i": {"$lt": 2}},
                                ]
                            },
                        ]
                    }
                )
            ],
        )

    async def test_find_regex(self):
        c = self.db.test
        await c.drop()
        await c.insert_one({"r": re.compile(".*")})

        self.assertIsInstance((await c.find_one())["r"], Regex)  # type: ignore
        async for doc in c.find():
            self.assertIsInstance(doc["r"], Regex)

    def test_find_command_generation(self):
        cmd = _gen_find_command(
            "coll",
            {"$query": {"foo": 1}, "$dumb": 2},
            None,
            0,
            0,
            0,
            None,
            DEFAULT_READ_CONCERN,
            None,
            None,
        )
        self.assertEqual(cmd, {"find": "coll", "$dumb": 2, "filter": {"foo": 1}})

    def test_bool(self):
        with self.assertRaises(NotImplementedError):
            bool(AsyncCollection(self.db, "test"))

    @async_client_context.require_version_min(5, 0, 0)
    async def test_helpers_with_let(self):
        c = self.db.test

        async def afind(*args, **kwargs):
            return c.find(*args, **kwargs)

        helpers = [
            (c.delete_many, ({}, {})),
            (c.delete_one, ({}, {})),
            (afind, ({})),
            (c.update_many, ({}, {"$inc": {"x": 3}})),
            (c.update_one, ({}, {"$inc": {"x": 3}})),
            (c.find_one_and_delete, ({}, {})),
            (c.find_one_and_replace, ({}, {})),
            (c.aggregate, ([],)),
        ]
        for let in [10, "str", [], False]:
            for helper, args in helpers:
                with self.assertRaisesRegex(TypeError, "let must be an instance of dict"):
                    await helper(*args, let=let)  # type: ignore
        for helper, args in helpers:
            await helper(*args, let={})  # type: ignore


if __name__ == "__main__":
    unittest.main()
